Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming Telemetry 2][SNOW-899866] Enables reportKafkaPartitionStart/Usage telemetry for Streaming #694

Merged
merged 35 commits into from
Sep 13, 2023

Conversation

sfc-gh-rcheng
Copy link
Collaborator

@sfc-gh-rcheng sfc-gh-rcheng commented Aug 25, 2023

  • SnowflakeTelemetryChannelStatus becomes a telemetry object, similar to SnowflakeTelemetryPipeUsage
  • Creates SnowflakeTelemetryChannelCreation, similar to SnowflakeTelemetryPipeCreation
  • Unifies reportKafkaPartitionStart/Usage code paths and enables telemetry for Streaming
  • Adds additional code coverage to the TelemetryService
    https://snowflakecomputing.atlassian.net/browse/SNOW-899866

@sfc-gh-rcheng sfc-gh-rcheng changed the title [draft] Rcheng sstelem [Streaming Telemetry 2] Enables reportKafkaPartitionStart/Usage telemetry for Streaming Aug 30, 2023
@@ -38,10 +40,9 @@ public SnowflakeTelemetryServiceV2(Connection conn) {
this.telemetry = TelemetryClient.createTelemetry(conn);
}

@Override
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved reportKafkaPartitionUsage to the parent SnowflakeTelemetryService


/**
* Base Constructor. Accepts a tableName and StageName.
*
* @param tableName Checks for Nullability
*/
public SnowflakeTelemetryBasicInfo(final String tableName) {
public SnowflakeTelemetryBasicInfo(
final String tableName, SnowflakeTelemetryService.TelemetryType telemetryType) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass telemetry type here to simplify reportKafkaPartitionUsage/Creation code paths across snowpipe and streaming

@sfc-gh-rcheng sfc-gh-rcheng marked this pull request as ready for review September 7, 2023 21:00
Copy link
Contributor

@sfc-gh-tzhang sfc-gh-tzhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments, PTAL. This is super helpful for us to debug issues without asking for client logs, thanks!

@@ -289,6 +290,8 @@ public TopicPartitionChannel(
this.offsetPersistedInSnowflake,
this.processedOffset,
this.latestConsumerOffset);
this.telemetryServiceV2.reportKafkaPartitionStart(
new SnowflakeTelemetryChannelCreation(this.tableName, this.channelName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the only place that we're creating channel? If no, we need to either cover all cases or update the metric name to something like partition start

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is the only place we create a channel. There are two constructors for TopicPartitionChannel. The test constructor calls into this constructor, so all TopicPartitionChannel object creations will go through this codepath

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we also reopen/create channel when the channel gets invalidated? Not sure if this is the same code path though

Copy link
Collaborator Author

@sfc-gh-rcheng sfc-gh-rcheng Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TopicPartitionChannel object does not get recreated when the channel is invalidated, so the current reportKafkaPartitionStart tracks when a new TopicPartitionChannel is created.

Alternatively we could emit a reportKafkaPartitionStart with each channel open, however I don't think this is a good idea because we can track this server side and we already have multiple channel open/close issues that may overload the telemetry service with events.

There are two code paths for opening channels: TopicPartitionChannel constructor and when we recover offset from snowflake (channel invalidation).

@@ -1044,6 +1047,9 @@ private SnowflakeStreamingIngestChannel openChannelForTable() {
public void closeChannel() {
try {
this.channel.close().get();

// telemetry and metrics
this.telemetryServiceV2.reportKafkaPartitionUsage(this.snowflakeTelemetryChannelStatus, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we send this somewhere periodically instead of just during close?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be ideal, but currently we don't have anything running periodically in the background. InsertRows is an option, but I'm worried about overloading the telemetry service since we have no control over how often the customer flushes their buffer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it something we plan to add in the future? I think we send the usage for Snowpipe periodically. Otherwise I feel like this is not very useful since there're no usage reported if there is no open/close on the channel

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not have any plans to add that in the future. It could potentially be a part of the flush service.

Currently we not periodically send Snowpipe usage. reportKafkaPartition() is called on on close and when the cleaner runs, which only runs on the first insert to initialize a pipe

Copy link
Collaborator

@sfc-gh-japatel sfc-gh-japatel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly looks good, left some questions and suggestions. PTAL and let me know what you think. Thanks!

@sfc-gh-japatel
Copy link
Collaborator

Lets also verify the data we send to snowflake and confirm we dont change anything in snowpipe telemetry. You can pick any of the above gh action runs and verify the data in telemetry view in snowhouse. (I dont think anything has changed but just to be double sure)

Comment on lines 51 to 52
public class Streaming {
public static final String OFFSET_PERSISTED_IN_SNOWFLAKE = "persisted-in-snowflake-offset";
Copy link
Collaborator

@sfc-gh-japatel sfc-gh-japatel Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. remove inner class. previous recommendation was to have a separate section/heading in this class which distinguishes between snowpipe and streaming. apologies if I was not clear before.
  2. lets clarify what tp here means. it is not intuitive but hopefully they read code and understand what tp means.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. discussed offline, added as a comment section
  2. I'll expand TP -> TopicPartition

@sfc-gh-rcheng
Copy link
Collaborator Author

Telemetry in snowhouse
image
image

* See {@link com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel} for offset
* description
*/
public static final String OFFSET_PERSISTED_IN_SNOWFLAKE = "persisted-in-snowflake-offset";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Collaborator

@sfc-gh-japatel sfc-gh-japatel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow up:

  1. merge gates are failing, please ensure they are passing before pushing
  2. doc changes - please create a jira for @sfc-gh-lema (for jmx)

LGTM, otherwise! Good stuff! Thank you

@sfc-gh-rcheng sfc-gh-rcheng changed the title [Streaming Telemetry 2] Enables reportKafkaPartitionStart/Usage telemetry for Streaming [Streaming Telemetry 2][SNOW-899866] Enables reportKafkaPartitionStart/Usage telemetry for Streaming Sep 13, 2023
@sfc-gh-rcheng
Copy link
Collaborator Author

Jira for documentation changes @sfc-gh-lema

@sfc-gh-rcheng sfc-gh-rcheng merged commit edb3eaf into master Sep 13, 2023
@sfc-gh-rcheng sfc-gh-rcheng deleted the rcheng-sstelem branch September 13, 2023 23:02
khsoneji pushed a commit to confluentinc/snowflake-kafka-connector that referenced this pull request Oct 12, 2023
khsoneji pushed a commit to confluentinc/snowflake-kafka-connector that referenced this pull request Oct 12, 2023
khsoneji pushed a commit to confluentinc/snowflake-kafka-connector that referenced this pull request Oct 12, 2023
khsoneji pushed a commit to confluentinc/snowflake-kafka-connector that referenced this pull request Oct 12, 2023
EduardHantig pushed a commit to streamkap-com/snowflake-kafka-connector that referenced this pull request Feb 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants